package com.zhida.gooutcrawler.spider;

import com.zhida.gooutcrawler.entity.Page;
import com.zhida.gooutcrawler.http.RetrofitFactory;
import com.zhida.gooutcrawler.spider.pipeline.IPipeline;
import com.zhida.gooutcrawler.spider.processor.IContentProcessor;
import com.zhida.gooutcrawler.spider.schedule.ISchedule;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

import okhttp3.ResponseBody;
import rx.Observable;
import rx.android.schedulers.AndroidSchedulers;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.schedulers.Schedulers;

/**
 * 采集的统筹管理类
 * Created by Administrator on 2017-04-05.
 */
public class Spider implements ISpider {

    private BlockingQueue<String> queue = new LinkedBlockingQueue<String>();

    private ISchedule schedule;
    private IContentProcessor contentProcessor;
    private IPipeline pipeline;

    private SpiderListener spiderListener;

    private final AtomicInteger runningCount = new AtomicInteger(0);
    private final AtomicInteger totalCount = new AtomicInteger(0);

    private boolean running = false;

    private int maxCount = 1;//最大线程数量


    public Spider(ISchedule schedule, IContentProcessor contentProcessor, IPipeline pipeline) {
        this.contentProcessor = contentProcessor;
        this.schedule = schedule;
        this.pipeline = pipeline;
    }

    public void start() {
        running = true;

        System.out.println("spider start method");

        String url = queue.poll();
        if (url != null) {
            synchronized (Spider.class) {
                int current = runningCount.get();
                //创建足够多的数量的请求
                if (current < maxCount) {
                    for (int i = 0; i < maxCount - current; i++) {
                        handleContent(url);
                    }
                }
            }
        } else {
            //通过schedule去获取url列表
            schedule.getList(this);
        }
    }

    public void pause() {
        running = false;
    }

    public void onContentFinish() {
        if (running) {
            start();
        }
    }

    public void handleContent(final String url) {
        if (spiderListener != null) {
            spiderListener.onStartOne(url);
        }
        runningCount.incrementAndGet();
        RetrofitFactory.getGoOutService().get(url)
                .flatMap(new Func1<ResponseBody, Observable<Page>>() {
                    public Observable<Page> call(ResponseBody responseBody) {
                        try {
                            String html = responseBody.string();

                            Page page = new Page();
                            page.setUrl(url);
                            page.setRawContent(html);

                            //解析html
                            if (contentProcessor != null) {
                                contentProcessor.parse(page);
                            }

                            //持久化数据
                            if (pipeline != null) {
                                pipeline.save(page);
                            }

                            return Observable.just(page);
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                        return Observable.never();
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<Page>() {
                    @Override
                    public void call(Page page) {
                        runningCount.decrementAndGet();
                        totalCount.incrementAndGet();

                        System.out.println("解析完成:" + page.getUrl());
                        if (spiderListener != null) {
                            spiderListener.onFinishOne(page);
                        }

                        onContentFinish();
                    }
                });
    }


    public void onListFinish(List<String> urls) {
        System.out.println("spider onListFinish");

        queue.addAll(urls);
        if (running) {
            start();
        }
    }

    @Override
    public void onListNoMore() {
        System.out.println("spider onListNoMore");

        running = false;
        if (spiderListener != null) {
            spiderListener.onFinishAll();
        }
    }

    public boolean isRunning() {
        return running;
    }

    public int getTotalCount() {
        return totalCount.get();
    }

    public SpiderListener getSpiderListener() {
        return spiderListener;
    }

    public void setSpiderListener(SpiderListener spiderListener) {
        this.spiderListener = spiderListener;
    }
}
